package com.zhang.req.page;

import com.zhang.second.utils.UserBehavior;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @title: 计算pv 存在数据倾斜
 * @author: zhang
 * @date: 2022/2/8 20:47
 */
public class PageViewCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
                .readTextFile("/Users/apple/IdeaProjects/flink_1.13/src/main/resources/UserBehavior.csv")
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new UserBehavior(
                                fields[0], fields[1], fields[2], fields[3],
                                Long.parseLong(fields[4]) * 1000L
                        );
                    }
                }).filter(r -> r.type.equals("pv"))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
                                    @Override
                                    public long extractTimestamp(UserBehavior element, long recordTimestamp) {
                                        return element.ts;
                                    }
                                })
                ).map(r -> Tuple2.of("pv", 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .sum(1)
                .print();

        env.execute();
    }
}
